package com.atguigu.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.app.func.DimAsyncJoinFunction;
import com.atguigu.bean.TradeProvinceOrderWindow;
import com.atguigu.utils.DateFormatUtil;
import com.atguigu.utils.KafkaUtil;
import com.atguigu.utils.MyClickHouseUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;

//数据流：web/app -> Mysql -> Maxwell -> Kafka(ODS) -> FlinkApp -> Kafka(DWD) -> FlinkApp -> ClickHouse(DWS)
//程  序：Mock -> Mysql -> Maxwell -> Kafka(ZK) -> DwdTradeOrderDetailApp -> Kafka(ZK) -> Dws10TradeProvinceOrderWindow(Redis,HDFS,ZK,HBase,Phoenix) -> ClickHouse(ZK)
public class Dws10TradeProvinceOrderWindow_04 {

    public static void main(String[] args) throws Exception {

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 需要从Checkpoint或者Savepoint启动程序
        //2.1 开启Checkpoint,每隔5秒钟做一次CK  ,并指定CK的一致性语义
        //env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
        // 2.2 设置超时时间为 1 分钟
        //env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
        // 2.3 设置两次重启的最小时间间隔
        //env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
        // 2.5 指定从 CK 自动重启策略
        //env.setRestartStrategy(RestartStrategies.failureRateRestart(
        //        3, Time.days(1L), Time.minutes(1L)
        //));
        // 2.6 设置状态后端
        //env.setStateBackend(new EmbeddedRocksDBStateBackend(true) );
        //env.getCheckpointConfig().setCheckpointStorage(
        //      "hdfs://hadoop102:8020/flinkCDC"
        //);
        // 2.7 设置访问HDFS的用户名
        //System.setProperty("HADOOP_USER_NAME", "atguigu");

        //TODO 2.读取DWD层订单明细主题创建流
        String topic = "dwd_trade_order_detail";
        String groupId = "province_order_220718";
        DataStreamSource<String> kafkaDS = env.addSource(KafkaUtil.getFlinkKafkaConsumer(topic, groupId));

        //TODO 3.将数据转换为JSON对象 同时过滤Null值数据
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                if (value != null) {
                    out.collect(JSON.parseObject(value));
                }
            }
        });

        //TODO 4.按照订单明细ID分组,去重数据(由LeftJOIN产生的)
        SingleOutputStreamOperator<JSONObject> filterDS = jsonObjDS.keyBy(json -> json.getString("id"))
                .process(new KeyedProcessFunction<String, JSONObject, JSONObject>() {

                    private ValueState<JSONObject> valueState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        valueState = getRuntimeContext().getState(new ValueStateDescriptor<JSONObject>("value-state", JSONObject.class));
                    }

                    @Override
                    public void processElement(JSONObject value, Context ctx, Collector<JSONObject> out) throws Exception {

                        JSONObject state = valueState.value();
                        if (state == null) {
                            TimerService timerService = ctx.timerService();
                            long ts = timerService.currentProcessingTime();
                            timerService.registerProcessingTimeTimer(ts + 10000L);
                        }

                        valueState.update(value);
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<JSONObject> out) throws Exception {
                        JSONObject value = valueState.value();
                        valueState.clear();
                        out.collect(value);
                    }
                });

        filterDS.print("filterDS>>>>>>>>>>>>");

        //TODO 5.将数据转换为JavaBean对象
        SingleOutputStreamOperator<TradeProvinceOrderWindow> tradeProvinceOrderDS = filterDS.map(jsonObject -> {

            HashSet<String> orderIds = new HashSet<>();
            orderIds.add(jsonObject.getString("order_id"));

            return TradeProvinceOrderWindow.builder()
                    .provinceId(jsonObject.getString("province_id"))
                    .orderIdSet(orderIds)
                    .orderAmount(jsonObject.getBigDecimal("split_total_amount"))
                    .ts(DateFormatUtil.toTs(jsonObject.getString("create_time"), true))
                    .build();
        });

        //TODO 6.提取时间戳生成WaterMark
        SingleOutputStreamOperator<TradeProvinceOrderWindow> tradeProvinceOrderWithWMDS = tradeProvinceOrderDS.assignTimestampsAndWatermarks(WatermarkStrategy.<TradeProvinceOrderWindow>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<TradeProvinceOrderWindow>() {
            @Override
            public long extractTimestamp(TradeProvinceOrderWindow element, long recordTimestamp) {
                return element.getTs();
            }
        }));

        //TODO 7.按照省份ID分组进行开窗聚合
        SingleOutputStreamOperator<TradeProvinceOrderWindow> reduceDS = tradeProvinceOrderWithWMDS.keyBy(TradeProvinceOrderWindow::getProvinceId)
                .window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)))
                .reduce(new ReduceFunction<TradeProvinceOrderWindow>() {
                    @Override
                    public TradeProvinceOrderWindow reduce(TradeProvinceOrderWindow value1, TradeProvinceOrderWindow value2) throws Exception {
                        value1.getOrderIdSet().addAll(value2.getOrderIdSet());
                        value1.setOrderAmount(value1.getOrderAmount().add(value2.getOrderAmount()));
                        return value1;
                    }
                }, new WindowFunction<TradeProvinceOrderWindow, TradeProvinceOrderWindow, String, TimeWindow>() {
                    @Override
                    public void apply(String s, TimeWindow window, Iterable<TradeProvinceOrderWindow> input, Collector<TradeProvinceOrderWindow> out) throws Exception {
                        //获取数据
                        TradeProvinceOrderWindow next = input.iterator().next();

                        //给定去重后的订单数
                        next.setOrderCount((long) next.getOrderIdSet().size());

                        //补充信息
                        next.setTs(System.currentTimeMillis());
                        next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
                        next.setStt(DateFormatUtil.toYmdHms(window.getStart()));
                        //输出数据
                        out.collect(next);
                    }
                });

        //TODO 8.关联省份表补充维度信息
        SingleOutputStreamOperator<TradeProvinceOrderWindow> reduceWithProvinceDS = AsyncDataStream.unorderedWait(reduceDS,
                new DimAsyncJoinFunction<TradeProvinceOrderWindow>("DIM_BASE_PROVINCE") {
                    @Override
                    public String getKey(TradeProvinceOrderWindow input) {
                        return input.getProvinceId();
                    }

                    @Override
                    public void join(TradeProvinceOrderWindow input, JSONObject dimInfo) {
                        input.setProvinceName(dimInfo.getString("NAME"));
                    }
                }, 100, TimeUnit.SECONDS);

        //TODO 9.将数据写出到ClickHouse
        reduceWithProvinceDS.print(">>>>>>");
        reduceWithProvinceDS.addSink(MyClickHouseUtil.getSinkFunction("insert into dws_trade_province_order_window values(?,?,?,?,?,?,?)"));

        //TODO 10.启动任务
        env.execute("Dws10TradeProvinceOrderWindow");

    }
}